Skip to content

feat(webrtc): LiveKit broker backend#2518

Open
ruthwikdasyam wants to merge 7 commits into
feat/webrtc-transportfrom
ruthwik/feat/livekit
Open

feat(webrtc): LiveKit broker backend#2518
ruthwikdasyam wants to merge 7 commits into
feat/webrtc-transportfrom
ruthwik/feat/livekit

Conversation

@ruthwikdasyam

@ruthwikdasyam ruthwikdasyam commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Problem

LiveKit as an alternative transport — simpler signaling (no SDP relay / SCTP-id juggling)

Closes DIM-XXX

Solution

  • New LiveKitBrokerProvider (providers/livekit_broker.py): asks the dimensional-teleop broker for a LiveKit room + JWT, connects straight to the SFU, and exposes the same bytes pub/sub + video contract as BrokerProvider. Topics (cmd_unreliable / state_reliable / state_reliable_back) are kept identical so the typed-fingerprint demux is unchanged.
  • LiveKitTransport / LiveKitVideoTransport in core/transport.py — drop-in alternatives to the Cloudflare classes (just swap _config_cls).
  • New blueprint teleop-hosted-go2-livekit; backend is chosen purely by which transport the blueprint wires.
  • Packaging: livekit optional extra (pip install dimos[livekit]); module imports cleanly without it via a find_spec gate.

publish/video paths mirror BrokerProvider (schedule under the lock; 16-bit frames scaled, not truncated). No changes to the merged Cloudflare path.

How to Test

TELEOP_API_KEY=dtk_live_... dimos run teleop-hosted-go2-livekit

Contributor License Agreement

  • I have read and approved the CLA.

@codecov

codecov Bot commented Jun 17, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 24.74747% with 149 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
...col/pubsub/impl/webrtc/providers/livekit_broker.py 21.98% 149 Missing ⚠️
@@                    Coverage Diff                    @@
##           feat/webrtc-transport    #2518      +/-   ##
=========================================================
+ Coverage                  69.10%   70.08%   +0.98%     
=========================================================
  Files                        842      823      -19     
  Lines                      74686    71896    -2790     
  Branches                    6708     6565     -143     
=========================================================
- Hits                       51610    50387    -1223     
+ Misses                     21409    19897    -1512     
+ Partials                    1667     1612      -55     
Flag Coverage Δ
OS-ubuntu-24.04-arm 64.27% <23.97%> (+0.66%) ⬆️
OS-ubuntu-latest 65.15% <23.97%> (+0.71%) ⬆️
Py-3.10 65.14% <23.97%> (+0.71%) ⬆️
Py-3.11 65.14% <23.97%> (+0.71%) ⬆️
Py-3.12 65.14% <23.97%> (+0.71%) ⬆️
Py-3.13 65.15% <23.97%> (+0.71%) ⬆️
Py-3.14 65.15% <23.97%> (+0.71%) ⬆️
Py-3.14t 65.14% <23.97%> (+0.71%) ⬆️
SelfHosted-Large 30.50% <23.97%> (?)
SelfHosted-Linux 38.60% <24.74%> (+0.41%) ⬆️
SelfHosted-macOS 37.25% <24.74%> (+0.37%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
dimos/core/transport.py 62.59% <100.00%> (+0.70%) ⬆️
dimos/robot/all_blueprints.py 100.00% <ø> (ø)
dimos/robot/test_all_blueprints.py 87.50% <ø> (ø)
dimos/teleop/quest_hosted/blueprints.py 100.00% <100.00%> (ø)
...col/pubsub/impl/webrtc/providers/livekit_broker.py 21.98% <21.98%> (ø)

... and 55 files with indirect coverage changes

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@ruthwikdasyam ruthwikdasyam marked this pull request as ready for review June 17, 2026 00:37
@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces LiveKitBrokerProvider as an alternative WebRTC transport backend that replaces the Cloudflare SDP-relay path with a LiveKit SFU room, eliminating SCTP-id juggling and SDP relay complexity. The four issues flagged in previous review rounds have all been resolved: _source is now cleared on publish failure for retry, reset() is called from _disconnect() to handle reconnects, heartbeat failures now log at WARNING, and _video._room is cleared via reset().

  • New provider (livekit_broker.py): mirrors BrokerProvider's lifecycle contract — lazy-connect on subscribe(), AsyncProviderBase stop/start, heartbeat loop, best-effort DELETE on disconnect — with LiveKit-specific data (publish_data) and video (VideoSource/LocalVideoTrack) paths.
  • Transport classes (LiveKitTransport / LiveKitVideoTransport): two-line subclasses that swap _config_cls, making backend selection a blueprint-level wiring choice with zero transport-layer changes.
  • New blueprint teleop-hosted-go2-livekit wires the Go2's cmd_vel and color_image topics through the new transport classes, activated solely by the TELEOP_API_KEY environment variable.

Confidence Score: 5/5

Safe to merge; the new LiveKit provider correctly mirrors the Cloudflare provider's stop/start lifecycle and all previously identified bugs have been addressed in this revision.

All four previously flagged issues (silent no-video on publish failure, stale source on reconnect, invisible heartbeat failures, dangling room reference) have been fixed. The new provider's connect/disconnect/reset cycle is structurally sound and consistent with BrokerProvider. Remaining observations are edge-case style points that do not affect correctness on the Go2 blueprint's fixed-resolution camera stream.

livekit_broker.py is the only file with any new observations, specifically around _VideoPublisher._capture not detecting resolution changes and use of asyncio.ensure_future.

Important Files Changed

Filename Overview
dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py New LiveKit broker provider — mirrors the Cloudflare BrokerProvider structure; previously-flagged issues (source reset on reconnect, silent publish failure, heartbeat logging, room reference leak) have all been addressed in this revision
dimos/core/transport.py Adds LiveKitTransport and LiveKitVideoTransport as drop-in subclasses of the existing WebRTC transport hierarchy; only change is wiring _config_cls = LiveKitBrokerConfig
dimos/teleop/quest_hosted/blueprints.py Adds teleop_hosted_go2_livekit blueprint wiring Go2 transport topics to LiveKit classes; mirrors the existing teleop_hosted_go2_transport blueprint cleanly
pyproject.toml Adds livekit optional extra (livekit>=1.0.0 + httpx>=0.27.0); mypy ignore entries added for livekit and livekit.*
dimos/robot/all_blueprints.py Registers the new teleop-hosted-go2-livekit blueprint in the global registry
dimos/robot/test_all_blueprints.py Correctly adds teleop-hosted-go2-livekit to SELF_HOSTED_BLUEPRINTS so it's excluded from the automated (no-hardware) blueprint test

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Robot
    participant LiveKitBrokerProvider
    participant Broker as dimensional-teleop broker
    participant LiveKitSFU as LiveKit SFU
    participant Operator

    Robot->>LiveKitBrokerProvider: start() / subscribe()
    LiveKitBrokerProvider->>Broker: "POST /api/v1/sessions {transport:"livekit"}"
    Broker-->>LiveKitBrokerProvider: "{session_id, url, token, room}"
    LiveKitBrokerProvider->>LiveKitSFU: Room.connect(url, token)
    LiveKitSFU-->>LiveKitBrokerProvider: connected
    LiveKitBrokerProvider->>LiveKitBrokerProvider: bind _VideoPublisher(room, loop)
    LiveKitBrokerProvider->>LiveKitBrokerProvider: start _heartbeat_loop()

    loop heartbeat (1Hz)
        LiveKitBrokerProvider->>Broker: "POST /api/v1/sessions/{id}/heartbeat"
    end

    Note over Robot,LiveKitSFU: Data channels (topics)
    Operator->>LiveKitSFU: data (cmd_unreliable / state_reliable)
    LiveKitSFU->>LiveKitBrokerProvider: data_received event → _dispatch()
    LiveKitBrokerProvider->>Robot: callback(payload, topic)

    Robot->>LiveKitBrokerProvider: publish(topic, data)
    LiveKitBrokerProvider->>LiveKitSFU: publish_data(data, reliable, topic)
    LiveKitSFU->>Operator: data

    Note over Robot,LiveKitSFU: Video track (lazy first frame)
    Robot->>LiveKitBrokerProvider: set_video_frame(img)
    LiveKitBrokerProvider->>LiveKitBrokerProvider: _image_to_rgba() → call_soon_threadsafe(_capture)
    LiveKitBrokerProvider->>LiveKitBrokerProvider: _capture: create VideoSource (first frame only)
    LiveKitBrokerProvider->>LiveKitSFU: publish_track(LocalVideoTrack)
    LiveKitBrokerProvider->>LiveKitSFU: capture_frame(VideoFrame)
    LiveKitSFU->>Operator: video stream

    Note over Robot,LiveKitSFU: Disconnect
    Robot->>LiveKitBrokerProvider: stop()
    LiveKitBrokerProvider->>Broker: "DELETE /api/v1/sessions/{id}"
    LiveKitBrokerProvider->>LiveKitSFU: Room.disconnect()
    LiveKitBrokerProvider->>LiveKitBrokerProvider: _video.reset()
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Robot
    participant LiveKitBrokerProvider
    participant Broker as dimensional-teleop broker
    participant LiveKitSFU as LiveKit SFU
    participant Operator

    Robot->>LiveKitBrokerProvider: start() / subscribe()
    LiveKitBrokerProvider->>Broker: "POST /api/v1/sessions {transport:"livekit"}"
    Broker-->>LiveKitBrokerProvider: "{session_id, url, token, room}"
    LiveKitBrokerProvider->>LiveKitSFU: Room.connect(url, token)
    LiveKitSFU-->>LiveKitBrokerProvider: connected
    LiveKitBrokerProvider->>LiveKitBrokerProvider: bind _VideoPublisher(room, loop)
    LiveKitBrokerProvider->>LiveKitBrokerProvider: start _heartbeat_loop()

    loop heartbeat (1Hz)
        LiveKitBrokerProvider->>Broker: "POST /api/v1/sessions/{id}/heartbeat"
    end

    Note over Robot,LiveKitSFU: Data channels (topics)
    Operator->>LiveKitSFU: data (cmd_unreliable / state_reliable)
    LiveKitSFU->>LiveKitBrokerProvider: data_received event → _dispatch()
    LiveKitBrokerProvider->>Robot: callback(payload, topic)

    Robot->>LiveKitBrokerProvider: publish(topic, data)
    LiveKitBrokerProvider->>LiveKitSFU: publish_data(data, reliable, topic)
    LiveKitSFU->>Operator: data

    Note over Robot,LiveKitSFU: Video track (lazy first frame)
    Robot->>LiveKitBrokerProvider: set_video_frame(img)
    LiveKitBrokerProvider->>LiveKitBrokerProvider: _image_to_rgba() → call_soon_threadsafe(_capture)
    LiveKitBrokerProvider->>LiveKitBrokerProvider: _capture: create VideoSource (first frame only)
    LiveKitBrokerProvider->>LiveKitSFU: publish_track(LocalVideoTrack)
    LiveKitBrokerProvider->>LiveKitSFU: capture_frame(VideoFrame)
    LiveKitSFU->>Operator: video stream

    Note over Robot,LiveKitSFU: Disconnect
    Robot->>LiveKitBrokerProvider: stop()
    LiveKitBrokerProvider->>Broker: "DELETE /api/v1/sessions/{id}"
    LiveKitBrokerProvider->>LiveKitSFU: Room.disconnect()
    LiveKitBrokerProvider->>LiveKitBrokerProvider: _video.reset()
Loading

Reviews (2): Last reviewed commit: "fix(webrtc): reset LiveKit video publish..." | Re-trigger Greptile

Comment thread dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py
Comment thread dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py
Comment thread dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py
Comment thread dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py
@github-actions github-actions Bot added the ready-to-merge Required CI checks have passed on this PR label Jun 17, 2026

broker_url: str | None = None
api_key: str | None = None
robot_id: str | None = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a list of robot_ids ?

Multiple robots teleop help us in cases like multiple instances of arms, example single user controlling 2 or 3 robots from a single session, each for specialized application.

Although I do think we should operate bimanual arms like openarm and humanoids as a single robot.

Comment thread pyproject.toml
Comment on lines 326 to +337
webrtc = [
# WebRTC DataChannel pubsub (Cloudflare Realtime SFU)
"aiortc>=1.14.0",
"httpx>=0.27.0",
]

livekit = [
# WebRTC pubsub over the hosted broker's LiveKit backend (robot rtc client).
"livekit>=1.0.0",
"httpx>=0.27.0",
]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making a teleop group so we are not making granular groups

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ready-to-merge Required CI checks have passed on this PR

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants